Snowflakeの継続的なデータロード「Snowpipe」を試してみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
Snowflakeには継続的にデータをロードするための仕組みとして、「Snowpipe」という機能があります。この機能をこれまで実際に触ったことがなかったので、今回は設定からデータをロードするまでを試してみました。
なお、「Snowpipe」については以下の記事でも紹介されていますので、よろしければご参照ください。
Snowpipeとは
SnowpipeはSnowflakeの継続的なデータロードの仕組みです。COPYコマンドを利用した一括ロードとは異なり、ステージ上でファイルが利用できるようになったことを検知して、ファイルを継続的にロードすることができます。
Snowpipeを設定してみる
実際にどのようなものなのか、設定してみて確認したいと思います。今回はAWSの「S3イベント通知」をトリガーとしてSnowpipeがデータロードをするようにしたいと思います。
なお、今回は一般的なオプションとされている「S3イベント通知」を利用した設定を行いますが、S3バケットに競合するイベント通知が存在する場合には、代わりに「SNSトピック」を利用したパターンでの実装が必要となります。
詳しくは以下のドキュメントの「正しいオプションの決定」の項に記載されています。
ステージの準備
ではドキュメントに沿って作成を進めてみます。まずはステージの準備です。
S3からのロードになるので外部ステージの設定が必要となるわけなのですが、ここは設定手順が長くなるので記述を割愛したいと思います。以前に以下の記事で実施したので同様に設定を行いました。
今回は外部ステージとして OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_EXTERNAL_STAGE
というステージを作成しています。
パイプの準備
次にパイプの準備をします。指定テーブルに対してCOPYを行うパイプを作成するのですが、まず先にテーブルを作成しておきます。
以下のように、VARIANT型のカラムだけを持つテーブルを作成しておきます。
CREATE TABLE OOTAKA_SANDBOX_DB.PUBLIC.LOG( record VARIANT );
作成したら、このテーブルに対してCOPYを行うパイプを作成します。
CREATE PIPE OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_PIPE AUTO_INGEST=TRUE AS COPY INTO OOTAKA_SANDBOX_DB.PUBLIC.LOG FROM @OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_EXTERNAL_STAGE/Snowpipe/ FILE_FORMAT = (TYPE = 'JSON') ;
指定外部ステージCM_OOTAKA_EXTERNAL_STAGE
の配下にあるSnowpipe
フォルダ内のオブジェクトを、JSON
フォーマットでCOPYするPIPEです。
AUTO_INGEST=TRUE
としていますが、これにより新しいデータをロードする準備ができたときに、S3バケットからSQSキューに送信されたイベント通知を読み取るようになるそうです。
権限設定
次に各オブジェクトの権限設定です。Snowpipeを利用するためには以下の権限を持つロールが必要となります。
オブジェクト | 権限 |
---|---|
名前付きパイプ | OWNERSHIP |
名前付きステージ | USAGE, READ |
名前付きファイル形式 | USAGE(*) |
ターゲットデータベース | USAGE |
ターゲットスキーマ | USAGE |
ターゲットテーブル | INSERT, SELECT |
(*)作成したステージが名前付きファイル形式を参照する場合にのみ必要
今回利用するロールOOTAKA_SANDBOX_ROLE
を改めて確認します。「名前付きファイル形式」は利用していないので、それ以外をクエリで確認していきます。
名前付きパイプ
名前付きパイプについては以下のクエリで確認します。
SELECT PIPE_CATALOG, PIPE_SCHEMA, PIPE_NAME, PIPE_OWNER FROM OOTAKA_SANDBOX_DB.INFORMATION_SCHEMA.PIPES WHERE PIPE_NAME = 'CM_OOTAKA_PIPE' ;
結果は以下になります。
PIPE_CATALOG | PIPE_SCHEMA | PIPE_NAME | PIPE_OWNER |
---|---|---|---|
OOTAKA_SANDBOX_DB | PUBLIC | CM_OOTAKA_PIPE | OOTAKA_SANDBOX_ROLE |
PIPE_OWNER
なのでOWNERSHIP
権限があります。
名前付きステージ
名前付きステージについては以下のクエリで確認します。
SELECT STAGE_CATALOG, STAGE_SCHEMA, STAGE_NAME, STAGE_OWNER FROM OOTAKA_SANDBOX_DB.INFORMATION_SCHEMA.STAGES WHERE STAGE_NAME = 'CM_OOTAKA_EXTERNAL_STAGE' ;
結果は以下になります。
STAGE_CATALOG | STAGE_SCHEMA | STAGE_NAME | STAGE_OWNER |
---|---|---|---|
OOTAKA_SANDBOX_DB | PUBLIC | CM_OOTAKA_EXTERNAL_STAGE | OOTAKA_SANDBOX_ROLE |
STAGE_OWNER
なのでUSAGE
とREAD
権限も持っています。
ターゲットデータベース、ターゲットスキーマ、ターゲットテーブル
データベース、スキーマ、テーブルについては以下のクエリで確認します。
SELECT GRANTEE, OBJECT_CATALOG, OBJECT_NAME, OBJECT_TYPE, PRIVILEGE_TYPE FROM OOTAKA_SANDBOX_DB.INFORMATION_SCHEMA.OBJECT_PRIVILEGES WHERE GRANTEE = 'OOTAKA_SANDBOX_ROLE' AND ( (OBJECT_TYPE = 'DATABASE' AND OBJECT_NAME = 'OOTAKA_SANDBOX_DB' AND PRIVILEGE_TYPE = 'USAGE') OR (OBJECT_TYPE = 'SCHEMA' AND OBJECT_CATALOG = 'OOTAKA_SANDBOX_DB' AND OBJECT_NAME = 'PUBLIC' AND PRIVILEGE_TYPE = 'USAGE') OR (OBJECT_TYPE = 'TABLE' AND OBJECT_CATALOG = 'OOTAKA_SANDBOX_DB' AND OBJECT_NAME = 'LOG' ) ) ORDER BY OBJECT_TYPE, PRIVILEGE_TYPE ;
結果は以下になります。
GRANTEE | OBJECT_CATALOG | OBJECT_NAME | OBJECT_TYPE | PRIVILEGE_TYPE |
---|---|---|---|---|
OOTAKA_SANDBOX_ROLE | OOTAKA_SANDBOX_DB | DATABASE | USAGE | |
OOTAKA_SANDBOX_ROLE | OOTAKA_SANDBOX_DB | PUBLIC | SCHEMA | USAGE |
OOTAKA_SANDBOX_ROLE | OOTAKA_SANDBOX_DB | LOG | TABLE | OWNERSHIP |
データベース、スキーマは想定通りUSAGE
権限を持っており、テーブルはOWNERSHIP
権限があるのでINSERT
、SELECT
に問題はありません。
以上で、権限に問題ないことが確認できました!
イベント通知設定
イベント通知設定では、AWSのS3イベント通知の設定を行っていきます。
まずは以下のコマンドで設定に必要なARNを取得します。
SHOW PIPES;
ここで表示された、利用するPIPEのSQSのARNを控えておきます。以下のような値になります。
arn:aws:sqs:ap-northeast-1:123456789012:sf-snowpipe-foo_bar
次に、AWSの管理コンソールに移り、外部ステージとして利用するS3バケットの画面を開き、「プロパティ」タブの「詳細設定」から「イベント」を選択します。
選択すると「+通知の追加」が表示されるのでクリックします。
設定は以下のように設定します。
「名前」には任意の名前を、「イベント」には「すべてのオブジェクト作成イベント」を指定します。
「プレフィックス」には「パイプの準備」の項で記載した通り、今回はSnowpipe
というフォルダ配下のファイルをロードしたいので、Snowpipe/
と指定します。
「送信先」と「SQS」には「SQLキュー」と「SQSキューのARNを追加」を指定して、「SQSキューのARN」に先程Snowflakeで確認したARNを指定して「保存」します。
履歴データのロード
今回はこれから初めてデータをロードすることになるので関係がないのですが、以前にステージングされたファイルがある場合には、以下の手順で履歴データをロードすることができるそうです。
COPY INTO の実行
これはPIPE作成時に設定したCOPY文の実行ですね。今回の例だと以下になります。
COPY INTO OOTAKA_SANDBOX_DB.PUBLIC.LOG FROM @OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_EXTERNAL_STAGE/Snowpipe/ FILE_FORMAT = (TYPE = 'JSON') ;
後述のALTER PIPE ... REFRESH
では過去7日間にステージングされたファイルを処理してくれるので、それより以前のファイルを考慮した処理になるということですね。
自動データロードを構成
これは、まさにいま設定してきたSnowpipeの設定です。
ALTER PIPE ... REFRESH の実行
この実行により、上記の「COPY INTOの実行」から、「自動データロードを構成」までの期間にステージングされたファイルがロードされます。
今回の例だと以下になります。なお、ターゲットテーブルとパイプの両方のロード履歴がチェックされるので、同じファイルが2回ロードされることはないそうです。
ALTER PIPE OOTAKA_SANDBOX_DB.PUBLIC.CM_OOTAKA_PIPE REFRESH;
Snowpipeを動かしてみる
以上で、設定が終わりました!早速試してみましょう。
ここでは詳細は省きますが、Kinesis FirehoseのDelivery Streamを作成して、データの取り込み対象のS3パスにテストデータを送り込んでみます。テストデータは「Test with demo data」から送信できます。
しばらくすると以下のようにファイルが送信されたのが確認できたので、送信を止めました。
なお、ファイルの中身は以下のようなJSONL形式のデータとなっています。
{"ticker_symbol":"ALY","sector":"ENERGY","change":-1.01,"price":88.09} {"ticker_symbol":"TBV","sector":"HEALTHCARE","change":8.62,"price":199.62} {"ticker_symbol":"ASD","sector":"FINANCIAL","change":-1.39,"price":64.41} ...
では、Snowflake側でロードされているかを確認してみます。
SELECT * FROM OOTAKA_SANDBOX_DB.PUBLIC.LOG;
想定どおり、ちゃんとロードされていますね!
まとめ
以上、Snowflakeの継続的なデータロード「Snowpipe」を試してみました。Kinesis Firehoseのように、継続的にS3に配置されるデータのロードにとても便利そうな機能ですね。うまく活用できるように利用方法について勉強していきたいと思います。
どなたかのお役に立てば幸いです。それでは!